Python websocket.WebSocketApp方法代码示例 您所在的位置:网站首页 py websocketapp 建立wss Python websocket.WebSocketApp方法代码示例

Python websocket.WebSocketApp方法代码示例

2024-07-10 15:10| 来源: 网络整理| 查看: 265

本文整理汇总了Python中websocket.WebSocketApp方法的典型用法代码示例。如果您正苦于以下问题:Python websocket.WebSocketApp方法的具体用法?Python websocket.WebSocketApp怎么用?Python websocket.WebSocketApp使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在websocket的用法示例。

在下文中一共展示了websocket.WebSocketApp方法的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于系统推荐出更棒的Python代码示例。

示例1: testSockMaskKey # 需要导入模块: import websocket [as 别名] # 或者: from websocket import WebSocketApp [as 别名] def testSockMaskKey(self): """ A WebSocketApp should forward the received mask_key function down to the actual socket. """ def my_mask_key_func(): pass def on_open(self, *args, **kwargs): """ Set the value so the test can use it later on and immediately close the connection. """ WebSocketAppTest.get_mask_key_id = id(self.get_mask_key) self.close() app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func) app.run_forever() # if numpu is installed, this assertion fail # Note: We can't use 'is' for comparing the functions directly, need to use 'id'. # self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func)) 开发者ID:birforce,项目名称:vnpy_crypto,代码行数:23,代码来源:test_websocket.py 示例2: connect # 需要导入模块: import websocket [as 别名] # 或者: from websocket import WebSocketApp [as 别名] def connect(self, apiKey, secretKey, trace=False): self.host = OKEX_USD_CONTRACT self.apiKey = apiKey self.secretKey = secretKey self.trace = trace websocket.enableTrace(trace) self.ws = websocket.WebSocketApp(self.host, on_message=self.onMessage, on_error=self.onError, on_close=self.onClose, on_open=self.onOpen) self.thread = Thread(target=self.ws.run_forever, args=(None, None, 60, 30)) self.thread.start() # ---------------------------------------------------------------------- 开发者ID:birforce,项目名称:vnpy_crypto,代码行数:20,代码来源:vnokex.py 示例3: run # 需要导入模块: import websocket [as 别名] # 或者: from websocket import WebSocketApp [as 别名] def run(self): token = self.client['config/auth.token'] device_id = self.client['config/app.device_id'] server = self.client['config/auth.server'] server = server.replace('https', "wss") if server.startswith('https') else server.replace('http', "ws") wsc_url = "%s/embywebsocket?api_key=%s&device_id=%s" % (server, token, device_id) LOG.info("Websocket url: %s", wsc_url) self.wsc = websocket.WebSocketApp(wsc_url, on_message=self.on_message, on_error=self.on_error) self.wsc.on_open = self.on_open while not self.stop: self.wsc.run_forever(ping_interval=10) if self.stop: break time.sleep(5) self.client['callback_ws']('WebSocketRestarting') LOG.info("---


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有